Uploaded image for project: 'StreamSets Data Collector'
  1. StreamSets Data Collector
  2. SDC-15576

Setting query interval to 0 in ElasticSearch might lead to Interrupt Exception

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: P3 (Limited Impact)
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 3.19.0
    • Component/s: None
    • Labels:
    • Testing Status:
      STF Testing Required
    • Team:
      Data Plane

      Description

      Unsure how much is incremental mode with no wait prevalent, but using pure "0" for that configuration might fail with InterruptedException:

      com.streamsets.pipeline.api.StageException: ELASTICSEARCH_22 - Failed to fetch batch: 'java.lang.InterruptedException'
      	at com.streamsets.pipeline.stage.origin.elasticsearch.ElasticsearchSource.produce(ElasticsearchSource.java:162)
      	at com.streamsets.pipeline.api.base.configurablestage.DPushSource.produce(DPushSource.java:44)
      	at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$1(StageRuntime.java:270)
      	at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
      	at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:244)
      	at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:279)
      	at com.streamsets.datacollector.runner.SourcePipe.process(SourcePipe.java:79)
      	at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPushSource(ProductionPipelineRunner.java:432)
      	at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:398)
      	at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:535)
      	at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:112)
      	at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
      	at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.startInternal(StandaloneRunner.java:762)
      	at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:755)
      	at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:157)
      	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
      	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:34)
      	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
      	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
      	at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:34)
      	at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.InterruptedException
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
      	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
      	at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:659)
      	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:219)
      	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:191)
      	at com.streamsets.pipeline.lib.elasticsearch.ElasticsearchStageDelegate.performRequest(ElasticsearchStageDelegate.java:230)
      	at com.streamsets.pipeline.stage.origin.elasticsearch.ElasticsearchSource$ElasticsearchTask.getInitialResponse(ElasticsearchSource.java:353)
      	at com.streamsets.pipeline.stage.origin.elasticsearch.ElasticsearchSource$ElasticsearchTask.produceBatch(ElasticsearchSource.java:283)
      	at com.streamsets.pipeline.stage.origin.elasticsearch.ElasticsearchSource$ElasticsearchTask.run(ElasticsearchSource.java:226)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	... 3 more
      

        Attachments

          Activity

            People

            Assignee:
            konstantin Konstantin Golub
            Reporter:
            jarcec Jarcec
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Dates

              Created:
              Updated:
              Resolved: